package com.newtec.dataprocess.executor.jobhandler;

import cn.hutool.core.util.StrUtil;
import com.newtec.dataprocess.core.context.XxlJobHelper;
import com.newtec.dataprocess.core.handler.IJobHandler;
import com.newtec.dataprocess.core.handler.annotation.XxlJob;
import com.newtec.dataprocess.executor.model.KettleScriptParam;
import com.newtec.dataprocess.executor.model.MyTransListener;
import com.newtec.dataprocess.kettle.common.utils.KettleParamUtils;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.stereotype.Component;
import org.w3c.dom.Document;
import org.w3c.dom.Node;

import java.io.*;

/**
 * @author: kettle 脚本模式执行器
 * @date: 2020-09-25 10:21
 * @version: 1.0
 * @description: 功能描述
 */
@Component("kettleScriptHandler")
public class KettleScriptHandler extends IJobHandler {
    /**
     * execute handler, invoked when executor receives a scheduling request
     *
     * @return
     * @throws Exception
     */
    @XxlJob(value = "scriptHandler", init = "init", destroy = "destroy")
    @Override
    public void execute() throws Exception {
        String param = XxlJobHelper.getJobParam();
        if (StrUtil.isEmpty(param)) {
            XxlJobHelper.log("请参照执行器管理的参数格式");
            XxlJobHelper.handleSuccess("请参照执行器管理的参数格式");
            return;
        }
        KettleScriptParam scriptParam = KettleScriptParam.toKettleScriptParam(param);
        if (scriptParam == null) {
            XxlJobHelper.log("请参照执行器管理的参数格式");
            XxlJobHelper.handleSuccess("请参照执行器管理的参数格式");
            return;
        }
        InputStream stream = null;
        try {
            stream = new ByteArrayInputStream(scriptParam.getFile().getBytes("utf-8"));
            Document doc = XMLHandler.loadXMLFile(stream, null, false, false);
            Node transNode = XMLHandler.getSubNode(doc, TransMeta.XML_TAG);
            TransMeta transMeta = new TransMeta();
            transMeta.loadXML(transNode, null, true);
            Trans trans = new Trans(transMeta);
            trans.addTransListener(new MyTransListener());
            trans.execute(null);
            //设置参数
            KettleParamUtils.setTransParams(trans, scriptParam.getParams());
            trans.waitUntilFinished();
            if (trans.getErrors() > 0) {
                XxlJobHelper.log("脚本执行报错！！");
                XxlJobHelper.handleFail("脚本执行报错！！");
                return;
            }
        } catch (KettleException e) {
            e.printStackTrace();
            XxlJobHelper.log(e);
            XxlJobHelper.log("kettle 脚本执行报错，请检查脚本！！");
            XxlJobHelper.handleFail("kettle 脚本执行报错，请检查脚本！！");
            return;
        } finally {
            if (stream != null) {
                stream.close();
            }
        }
        XxlJobHelper.handleSuccess("执行成功！！");
    }


    @Override
    public void init() throws Exception {
        super.init();
    }

    @Override
    public void destroy() throws Exception {
        super.destroy();
    }
}
